0%

从Mpsc到RingBuffer(一)- Mpsc

前言

其实一开始没有接触过这个,但是看Netty源码的时候,发现Netty的对象池使用了MpscQueue,感觉还挺有意思的。

MpscQueue主要针对的是单消费者,多生产者的情况,实现上是LockFree的,这里的Lock Free,一般都是指用了CAS解决并发问题。

单消费者单生产者

我们先一步一步来,先考虑单消费者单生产者的情况。

这种情况下,除了Lock Free,还可以做到Wait Free。

数组存储

如果我们使用数组作为存储的话,维护一个ReadIndex和WriteIndex。

需要保证WriteIndex > ReadIndex就行。

每次读的时候,判断队列是否为空,每次写的时候判断队列是否已经满了。

链表存储

如果我们使用链表作为存储的话,原理和数组类似,可以维护一个Head节点和一个Tail节点。

每次写入的时候

1
2
3
Node node = newNode();
tail.next = node;
tail = node;

每次读取的时候

1
2
3
4
5
6
if (head == tail) {
return false; //队列为空
}
Node node = head;
head = head.next;
return node;

单消费者,多生产者

这里我只想对于链表的实现,因为这是Netty的默认实现方式。

对于数组的实现方式,等大家看了RingBuffer的实现方式之后,想必自然就懂了。

在链表的实现上,对于单消费者,多生产者,其实对于消费者端的代码而言,区别和单消费者单生产者不大。

但是由于生产者可能有多个,所以对于tail指针的操作,多线程下是不安全的。

在Netty的Mpsc中,引入CAS操作,对tail指针进行原子操作:

1
2
3
4
5
6
7
8
MpscLinkedNode tail = new MpscLinkedNode();

private MpscLinkedQueueNode<E> replaceTail(MpscLinkedQueueNode<E> node) {
return getAndSet(node);
}
public final MpscLinkedNode getAndSet(MpscLinkedNode newValue) {
return (MpscLinkedNode)unsafe.getAndSetObject(this, valueOffset, newValue);
}

这里的valueOffset就是tail对象的地址。

replaceTail这个函数,接收一个新的Node节点,将Tail节点替换成新的Node阶段,同时返回旧的Tail节点。

Offer操作

1
2
3
4
5
6
public boolean offer(E value) {
MpscLinkedQueueNode<E> newTail = new DefaultNode<E>(value)
MpscLinkedQueueNode<E> oldTail = replaceTail(newTail); // 1
oldTail.setNext(newTail); // 2
return true;
}

生产者加元素的代码不多,算起来就2行。

我们用图例来演示这2步究竟做了什么,首先,我们有一个单链表的结构。

image-20200826221709219

第一步,单线程

image-20200826221709219

第一步,原子替换Tail节点,将新的节点设置为Tail节点。

但是这一步操作完之后,其实并没有改变之前的节点的Next指向,上一个节点的Next还是指向的之前的Tail节点。

第二步,单线程

image-20200826221709219

第二步执行完,才完成整个Next指针的链接。

第一步,多线程

image-20200826221709219

假设第一步被多线程并发了,现在有2个线程同时执行完了第一步。

这个时候,整个链表的结构看起来就是这样的。

后面两个接待是断开的。

第二步,多线程

image-20200826221709219

但是没关系,因为对于每个线程而言,都有自己的newTail和oldTail,这些newTail和oldTail相互串联了起来。

这2个线程结束之后,又是一个完整的链表。

take

对于消费者而言,因为没有竞争,其实连CAS都不需要。

1
2
3
4
5
6
7
8
9
10
11
12
13
private MpscLinkedQueueNode<E> peekNode() {
for (;;) {
final MpscLinkedQueueNode<E> head = headRef.get();
final MpscLinkedQueueNode<E> next = head.next();

if (next != null) {
return next;
}
if (head == getTail()) {
return null;
}
}
}

先看peekNode方法,因为head不保存数据,同时在多线程并发的情况下,可能会出现节点之间还没有被串联起来的情况。

这里使用了for(;;)去等待数据。

同时如果head == tail,表示队列中没有数据。

看完了peek,我们再来看poll方法

1
2
3
4
5
6
7
8
9
10
public E poll() {
final MpscLinkedQueueNode<E> next = peekNode();
if (next == null) {
return null;
}
MpscLinkedQueueNode<E> oldHead = headRef.get();
headRef.lazySet(next);
oldHead.setNext(null);
return next.clearMaybe();
}

这里就是拿到就的Head节点,把他置空,将next节点置为新的Head节点。

参考文章

https://blog.csdn.net/dingguohang/article/details/55252969